package com.vaye.im.websocket;

import cn.dev33.satoken.session.SaSession;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.dfa.SensitiveUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.vaye.common.constant.CacheConstant;
import com.vaye.common.constant.ImConstant;
import com.vaye.common.enums.TopicTypeEnums;
import com.vaye.common.utils.RedisUtil;
import com.vaye.common.utils.SpringContextUtils;
import com.vaye.common.utils.UserSessionUtils;
import com.vaye.im.dto.TopicMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 该类是对{@link ProductWebSocket2}的升级，升级内容：发送人的信息从token中获取，不需要再通过路径传递发送人的userId
 */
@Slf4j
@Component
@ServerEndpoint(value = "/im21/{targetUserId}",configurator = WebSocketConfigurator.class) //添加消息编码器
public class ProductWebSocket21 {

    private StringRedisTemplate stringRedisTemplate = SpringContextUtils.getBean(StringRedisTemplate.class);

    private static RedisUtil redisUtil;

    @Autowired
    public void setRedisUtil(RedisUtil redisUtil) {
        ProductWebSocket21.redisUtil = redisUtil;
    }

    @OnOpen
    public void onOpen(Session session){
        log.info("ProductWebSocket21-开始建立链接");
        Map<String, Object> userProperties = session.getUserProperties();
        Long loginId = (Long) userProperties.get(ImConstant.LOGINID);
        String host = (String) userProperties.get(ImConstant.HOST);
        String nick = ((JSONObject) StpUtil.getSessionByLoginId(loginId).getDataMap().get(SaSession.USER)).getString("nick");
        UserSessionUtils.add(loginId,session,nick);
        if (ObjectUtils.isEmpty(loginId)) {
            log.error("发送人不能为空");
            throw new IllegalArgumentException("发送人不能为空");
        }
        log.info("websocket 新客户端连入，userName={},host={}",nick,host);
        addOnlineCount();
        log.info("当前在线人数：{}人",getOnlineCount());
        send(loginId,null, String.format("%s:欢迎%s登录","服务器",nick));
    }

    /**
     * 服务端接收到信息后调用
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(@PathParam("targetUserId") Long targetUserId, String message, Session session) {
        Map<String, Object> userProperties = session.getUserProperties();
        Long loginId = (Long) userProperties.get(ImConstant.LOGINID);
        String host = (String) userProperties.get(ImConstant.HOST);
        log.info("ProductWebSocket21.onMessage userName={},msg={},host={}",UserSessionUtils.getNickName(loginId),message,host);
        try {
            if (ObjectUtils.isEmpty(targetUserId)) {
                session.getBasicRemote().sendText("the params of targetUserId is empty,send msg error");
                return;
            }
            if (ObjectUtils.isEmpty(targetUserId)) {
                send(loginId,targetUserId,message);
            } else {
                TopicMsg topicMsg = new TopicMsg();
                topicMsg.setType(TopicTypeEnums.PEER_TO_PEER.getCode());
                topicMsg.setFromUserId(String.valueOf(loginId));
                topicMsg.setTargetUserId(String.valueOf(targetUserId));
                topicMsg.setMsg(message);
                String body = JSON.toJSONString(topicMsg);
                stringRedisTemplate.convertAndSend(CacheConstant.WS_CHAT_TOPIC,body);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接关闭时调用
     */
    @OnClose
    public void onClose(Session session) {
        Map<String, Object> userProperties = session.getUserProperties();
        Long loginId = (Long) userProperties.get(ImConstant.LOGINID);
        String host = (String) userProperties.get(ImConstant.HOST);
        log.info("一个客户端关闭连接,客户端userName={},host={}",UserSessionUtils.getNickName(loginId),host);
        UserSessionUtils.remove(loginId);
        subOnlineCount();
        log.info("当前在线人数：{}",getOnlineCount());
    }

    /**
     * 服务端websocket出错时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket出现错误");
        error.printStackTrace();
    }

    public static synchronized void subOnlineCount() {
        int i = (int) redisUtil.get(CacheConstant.ONLINE_COUNT);
        if (i > 0) {
            redisUtil.decr(CacheConstant.ONLINE_COUNT,1L);
        }
    }

    public static synchronized int getOnlineCount() {
        return (int)redisUtil.get(CacheConstant.ONLINE_COUNT);
    }

    /**
     * 服务端发送信息给客户端
     * @param fromUserId 发送用户ID
     * @param targetUserId 用户ID
     * @param message 发送的消息
     */
    public void send(Long fromUserId,Long targetUserId, String message){
        log.info("#### 点对点消息，userId={}", targetUserId);
        if (SensitiveUtil.containsSensitive(message)) {
            message = SensitiveUtil.sensitiveFilter(message,true,null);
        }
        if (UserSessionUtils.isEmpty()) {
            log.warn("当前无websocket连接");
            return;
        }
        try {
            if (ObjectUtils.isEmpty(targetUserId)) {
                UserSessionUtils.getSession(fromUserId).getBasicRemote().sendText(message);
                message = String.format("%s:%s",UserSessionUtils.getNickName(fromUserId),message);
            }else {
                message = String.format("%s:%s",UserSessionUtils.getNickName(fromUserId),message);
                UserSessionUtils.getSession(targetUserId).getBasicRemote().sendText(message);
            }
        } catch (IOException exception) {
            log.error("点对点消息发送消息失败，fromUserId={},targetUserId={},message={}",fromUserId,targetUserId,message);
            exception.printStackTrace();
        }
    }

    public static synchronized void addOnlineCount() {
        redisUtil.incr(CacheConstant.ONLINE_COUNT,1L);
    }
}
